package com.dc.schedule.server.service.client.impl;

import com.dc.schedule.api.base.WaitResponseLock;
import com.dc.schedule.api.context.FireTaskContext;
import com.dc.schedule.api.model.ClientTaskNotifyMessage;
import com.dc.schedule.api.model.SocketClientRegistryMessage;
import com.dc.schedule.api.model.SocketFireTaskMessage;
import com.dc.schedule.api.model.SocketMessage;
import com.dc.schedule.api.model.SocketStatusMessage;
import com.dc.schedule.api.model.StaticTaskRegistry;
import com.dc.schedule.server.base.TransactionalExecutor;
import com.dc.schedule.server.config.ServerConfig;
import com.dc.schedule.server.domain.AppRegistryEntity;
import com.dc.schedule.server.domain.ClientRegistryEntity;
import com.dc.schedule.server.domain.ServerRegistryEntity;
import com.dc.schedule.server.domain.StaticTaskEntity;
import com.dc.schedule.server.model.ClientConnectionEvent;
import com.dc.schedule.server.model.ClientDisconnectEvent;
import com.dc.schedule.server.model.ClientExceptionEvent;
import com.dc.schedule.server.model.ClientMessageEvent;
import com.dc.schedule.server.repository.AppRegistryRepository;
import com.dc.schedule.server.repository.ClientRegistryRepository;
import com.dc.schedule.server.repository.StaticTaskRepository;
import com.dc.schedule.server.service.client.ClientConnectionService;
import com.dc.schedule.server.service.client.ClientManageService;
import com.dc.schedule.server.service.server.ServerRegistryService;
import com.dc.schedule.server.service.task.TaskManageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * <p>Descriptions...
 *
 * @author Diamon.Cheng
 * @date 2022/7/23.
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class ClientManageServiceImpl implements ClientManageService, InitializingBean, DisposableBean {
    private final ClientConnectionService clientConnectionService;
    private final ThreadPoolTaskExecutor coreThreadExecutor;
    private final ThreadPoolTaskExecutor clientEventLoopPool;
    private final AppRegistryRepository appRegistryRepository;
    private final ClientRegistryRepository clientRegistryRepository;
    private final StaticTaskRepository staticTaskRegistryRepository;
    private final ServerRegistryService serverRegistryService;
    private final TransactionalExecutor transactionalExecutor;
    
    private final TaskManageService taskManageService;
    private final ServerConfig serverConfig;
    private volatile boolean running = true;
    
    private final Map<String, String> clientInnerIdClientKeyMap = new ConcurrentHashMap<>();
    private final Map<String, WaitResponseLock<SocketMessage>> responseWaitMap = new ConcurrentHashMap<>(3);
    
    @Override
    public void destroy() {
        running = false;
    }
    
    @Override
    public void afterPropertiesSet() {
        clientEventLoopPool.execute(this::loopEvents);
    }
    
    protected void onClientMessage(ClientMessageEvent event) {
        final SocketMessage socketMessage = event.getSocketMessage();
        log.info("接收到客户端消息: {}", socketMessage);
        final WaitResponseLock<SocketMessage> waitResponseLock;
        if (socketMessage.getRequestUid() != null &&
                    (waitResponseLock = responseWaitMap.remove(socketMessage.getRequestUid())) != null) {
            waitResponseLock.setResponse(socketMessage);
            return;
        }
        try {
            if (socketMessage instanceof SocketStatusMessage) {
                return;
            }
            if (socketMessage instanceof SocketClientRegistryMessage) {
                final List<StaticTaskEntity> staticTasks = transactionalExecutor.execute(() -> {
                    return processClientRegistryMessage((SocketClientRegistryMessage) socketMessage, event.getInnerClientId());
                });
                // TODO 将 staticTasks 返回， 用于实现客户端 ScheduleTaskHolder 兼容 actuator、spring-boot-admin
                clientConnectionService.sendMessage(
                        event.getInnerClientId(),
                        SocketStatusMessage.ok(socketMessage.getRequestUid())
                );
            } else {
                if (socketMessage instanceof ClientTaskNotifyMessage) {
                    final ClientTaskNotifyMessage clientTaskNotifyMessage = (ClientTaskNotifyMessage) socketMessage;
                    taskManageService.refreshExecutionStatus(
                            clientTaskNotifyMessage.getExecutionId(),
                            clientTaskNotifyMessage.getStatus(),
                            clientTaskNotifyMessage.getMessage()
                    );
                }
                clientConnectionService.sendMessage(
                        event.getInnerClientId(),
                        SocketStatusMessage.ok(socketMessage.getRequestUid())
                );
            }
            
        } catch (Exception ex) {
            log.error("处理客户端消息失败", ex);
            clientConnectionService.sendMessage(
                    event.getInnerClientId(),
                    SocketStatusMessage.error(
                            socketMessage.getRequestUid(),
                            ex.getClass().getSimpleName() + ":" + ex.getMessage()
                    )
            );
            // 服务端暂时不断开连接
            // clientConnectionService.disconnect(event.getInnerClientId());
        }
    }
    
    protected List<StaticTaskEntity> processClientRegistryMessage(SocketClientRegistryMessage message, String innerClientId) {
        final String appName = message.getAppName();
        AppRegistryEntity appRegistry;
        try {
            final AppRegistryEntity appRegistryToInsert = new AppRegistryEntity();
            appRegistryToInsert.setAppName(appName);
            appRegistry = transactionalExecutor.execute(() -> appRegistryRepository.save(appRegistryToInsert));
        } catch (Exception e) {
            log.debug("app name [" + appName + "] already exists");
            appRegistry = appRegistryRepository.getByAppName(appName);
        }
        final String clientId = message.getClientId();
        final List<StaticTaskRegistry> taskRegistryList = message.getStaticTasks();
        final List<StaticTaskEntity> staticTasks = new ArrayList<>(taskRegistryList.size());
        for (StaticTaskRegistry registry : taskRegistryList) {
            StaticTaskEntity staticTaskEntity;
            final String taskKey = appName + "#" + registry.getTaskName();
            try {
                final StaticTaskEntity staticTaskToInsert = new StaticTaskEntity();
                staticTaskToInsert.setAppRegistry(appRegistry);
                staticTaskToInsert.setTaskName(registry.getTaskName());
                staticTaskToInsert.setTaskDesc(registry.getTaskDesc());
                staticTaskToInsert.setTaskKey(taskKey);
                staticTaskToInsert.setConfigVersion(1);
                staticTaskToInsert.setTriggerExpression(registry.getTriggerExpression());
                staticTaskToInsert.setEnabled(true);
                staticTaskEntity =
                        transactionalExecutor.execute(() -> staticTaskRegistryRepository.save(staticTaskToInsert));
            } catch (Exception e) {
                log.debug("static task key [" + taskKey + "] on appName [" + appName + "] already exists");
                staticTaskEntity = staticTaskRegistryRepository.getByTaskKey(taskKey);
            }
            staticTasks.add(staticTaskEntity);
        }
        final String clientKey = appName + "#" + clientId;
        try {
            final ClientRegistryEntity clientRegistryToInsert = new ClientRegistryEntity();
            clientRegistryToInsert.setAppRegistry(appRegistry);
            clientRegistryToInsert.setServerRegistry(serverRegistryService.getCurrentServerRegistry());
            clientRegistryToInsert.setClientId(clientId);
            clientRegistryToInsert.setClientKey(clientKey);
            clientRegistryToInsert.setStaticTasks(staticTasks);
            transactionalExecutor.executeRunnable(() -> clientRegistryRepository.save(clientRegistryToInsert));
        } catch (Exception e) {
            log.debug("client id [" + clientId + "] with app name [" + appName + "] already exists");
            final ClientRegistryEntity clientRegistry = clientRegistryRepository.getByClientKey(clientKey);
            final ServerRegistryEntity oldClientServer = clientRegistry.getServerRegistry();
            if (oldClientServer == null ||
                        oldClientServer.getHeartbeatTime() == null ||
                        LocalDateTime.now()
                                .minus(serverConfig.getServerDeadHeartbeatTimeBeforeSeconds(), ChronoUnit.SECONDS)
                                .isAfter(oldClientServer.getHeartbeatTime())
            ) {
                // 如果旧的客户端的服务器已经死了, 就顶掉
                clientRegistry.setServerRegistry(serverRegistryService.getCurrentServerRegistry());
                clientRegistry.getStaticTasks().clear();
                clientRegistry.getStaticTasks().addAll(staticTasks);
            }else{
                throw new IllegalStateException("客户端["+clientKey+"]已经存在并注册, 不允许重复注册");
            }
        }
        clientInnerIdClientKeyMap.put(innerClientId, clientKey);
        taskManageService.registerClientTask(staticTasks, new ClientHandler(clientKey) {
            @Override
            public void fireTask(FireTaskContext context) {
                final SocketFireTaskMessage socketFireTaskMessage = new SocketFireTaskMessage();
                socketFireTaskMessage.setTaskContext(context);
                SocketStatusMessage response =
                        (SocketStatusMessage) sendMessageForResponse(innerClientId, socketFireTaskMessage);
                if (!response.isSuccess()) {
                    throw new IllegalStateException("任务调度失败, " + response.getMessage());
                }
            }
        });
        
        return staticTasks;
        
    }
    
    protected SocketMessage sendMessageForResponse(String innerClientId, SocketMessage socketMessage) {
        try {
            socketMessage.setRequestUid(UUID.randomUUID().toString());
            final WaitResponseLock<SocketMessage> response = new WaitResponseLock<>();
            responseWaitMap.put(socketMessage.getRequestUid(), response);
            clientConnectionService.sendMessage(innerClientId, socketMessage);
            return response.getResponse();
        } finally {
            responseWaitMap.remove(socketMessage.getRequestUid());
        }
    }
    
    protected void onClientDisconnect(ClientDisconnectEvent event) {
        final String clientKey = clientInnerIdClientKeyMap.remove(event.getInnerClientId());
        if (clientKey == null) {
            return;
        }
        transactionalExecutor.executeRunnable(() -> {
            final ClientRegistryEntity clientRegistry = clientRegistryRepository.getByClientKey(clientKey);
            clientRegistry.getStaticTasks().clear();
            clientRegistryRepository.delete(clientRegistry);
            // 停掉当前客户端的所有调度器
            taskManageService.unregister(clientKey);
        });
        
    }
    
    protected void onClientException(ClientExceptionEvent event) {
        clientConnectionService.sendMessage(
                event.getInnerClientId(),
                SocketStatusMessage.error(
                        event.getRequestUid(),
                        event.getCause().getClass().getName()
                                + ":" + event.getCause().getMessage()
                )
        );
    }
    
    protected void processEvent(ClientConnectionEvent event) {
        if (event instanceof ClientDisconnectEvent) {
            onClientDisconnect((ClientDisconnectEvent) event);
        } else if (event instanceof ClientMessageEvent) {
            onClientMessage((ClientMessageEvent) event);
        } else if (event instanceof ClientExceptionEvent) {
            onClientException((ClientExceptionEvent) event);
        } else {
            log.warn("不支持的Client Event: {}", event);
        }
    }
    
    protected void loopEvents() {
        while (running) {
            final ClientConnectionEvent event = clientConnectionService.takeEvent();
            if (event == null) {
                continue;
            }
            coreThreadExecutor.execute(() -> {
                try {
                    processEvent(event);
                } catch (Exception e) {
                    log.warn("事件处理失败", e);
                }
            });
        }
        log.info("Loop Events End");
    }
}
